Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する
こんにちは、CX 事業本部 Delivery 部の若槻です。
AWS Glue は、データの ETL 処理をサーバーレスで簡単に実装できるマネージドサービスです。
今回は、AWS Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する方法を確認してみました。
やってみた
Glue ジョブコード
import sys from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame from pyspark.sql.functions import current_timestamp args = getResolvedOptions( sys.argv, [ 'JOB_NAME', 'SOURCE_GLUE_TABLE_NAME', 'TARGET_GLUE_TABLE_NAME', 'GLUE_DATABASE_NAME' ] ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) timestamp = current_timestamp() logger = glueContext.get_logger() logger.info(f"Timestamp={timestamp}") # データ読み込み GlueTableData = ( glueContext.create_dynamic_frame.from_catalog( database=args['GLUE_DATABASE_NAME'], table_name=args['SOURCE_GLUE_TABLE_NAME'], transformation_ctx='SourceData', ) ) # 読み込みデータがない場合はジョブを終了する if GlueTableData.count() == 0: logger.info("No data found in the source table. Exiting the job.") job.commit() GlueTableDataDF = GlueTableData.toDF() GlueTableDataDF.show() # データ加工 GlueTableDataDF = GlueTableDataDF.withColumn('timestamp', timestamp) GlueTableDataDF.show() GlueTableData = DynamicFrame.fromDF( GlueTableDataDF, glueContext, 'CreatedDynamicFrame' ) # データ書き込み glueContext.write_dynamic_frame.from_catalog( frame=GlueTableData, database=args['GLUE_DATABASE_NAME'], table_name=args['TARGET_GLUE_TABLE_NAME'], transformation_ctx='TargetData', additional_options={ 'enableUpdateCatalog': True, 'partitionKeys': [ 'year', 'month', 'day' ] } ) # パーティション作成 spark.sql(f"MSCK REPAIR TABLE {args['GLUE_DATABASE_NAME']}.{args['TARGET_GLUE_TABLE_NAME']}") job.commit()
- Glue ジョブでは PySpark を使用して処理を記述します。
- Glue テーブルからのデータの読み取りは
create_dynamic_frame.from_catalog
を使用します。- 今回は使用していませんが、
push_down_predicate
オプションを使うとパーティションキーで読み取りデータをフィルターできます。
- 今回は使用していませんが、
- Glue テーブルへのデータの書き込みは
write_dynamic_frame.from_catalog
を使用します。 write_dynamic_frame.from_catalog
ではパーティションの自動作成は行われないので、データ書込み後にMSCK REPAIR TABLE
クエリを合わせて実行しています。
CDK コード
import { aws_s3, RemovalPolicy, Stack, StackProps, CfnOutput, } from 'aws-cdk-lib'; import { Construct } from 'constructs'; import * as glue_alpha from '@aws-cdk/aws-glue-alpha'; export class CdkSampleStack extends Stack { constructor(scope: Construct, id: string, props: StackProps) { super(scope, id, props); // Glue データベース const glueDatabase = new glue_alpha.Database(this, 'GlueDatabase'); // データソース S3 バケット const dataSourceBucket = new aws_s3.Bucket(this, 'DataSourceBucket', { removalPolicy: RemovalPolicy.DESTROY, autoDeleteObjects: true, }); // データソース Glue テーブル const sourceGlueTable = new glue_alpha.S3Table(this, 'SourceGlueTable', { database: glueDatabase, bucket: dataSourceBucket, s3Prefix: 'data/', dataFormat: glue_alpha.DataFormat.PARQUET, partitionKeys: [ { name: 'year', type: glue_alpha.Schema.STRING, }, { name: 'month', type: glue_alpha.Schema.STRING, }, { name: 'day', type: glue_alpha.Schema.STRING, }, ], columns: [ { name: 'id', type: glue_alpha.Schema.STRING, }, { name: 'airTemperature', type: glue_alpha.Schema.DOUBLE, }, ], }); // データターゲット S3 バケット const dataTargetBucket = new aws_s3.Bucket(this, 'DataTargetBucket', { removalPolicy: RemovalPolicy.DESTROY, autoDeleteObjects: true, }); // データターゲット Glue テーブル const targetGlueTable = new glue_alpha.S3Table(this, 'TargetGlueTable', { database: glueDatabase, bucket: dataTargetBucket, s3Prefix: 'data/', dataFormat: glue_alpha.DataFormat.PARQUET, partitionKeys: [ { name: 'year', type: glue_alpha.Schema.STRING, }, { name: 'month', type: glue_alpha.Schema.STRING, }, { name: 'day', type: glue_alpha.Schema.STRING, }, ], columns: [ { name: 'id', type: glue_alpha.Schema.STRING, }, { name: 'airTemperature', type: glue_alpha.Schema.DOUBLE, }, { name: 'timestamp', type: glue_alpha.Schema.TIMESTAMP, }, ], }); // Glue ジョブ const glueJob = new glue_alpha.Job(this, 'GlueJob', { executable: glue_alpha.JobExecutable.pythonEtl({ glueVersion: glue_alpha.GlueVersion.V4_0, pythonVersion: glue_alpha.PythonVersion.THREE, script: glue_alpha.Code.fromAsset('src/glue-job.py'), }), defaultArguments: { '--job-bookmark-option': 'job-bookmark-disable', '--enable-glue-datacatalog': 'true', '--GLUE_DATABASE_NAME': glueDatabase.databaseName, '--SOURCE_GLUE_TABLE_NAME': sourceGlueTable.tableName, '--TARGET_GLUE_TABLE_NAME': targetGlueTable.tableName, }, continuousLogging: { enabled: true }, enableProfilingMetrics: true, }); // Glue ジョブの権限設定 sourceGlueTable.grantRead(glueJob); targetGlueTable.grantWrite(glueJob); // 物理 ID 確認用 new CfnOutput(this, 'GlueDatabseName', { value: glueDatabase.databaseName, }); new CfnOutput(this, 'SourceGlueTableName', { value: sourceGlueTable.tableName, }); new CfnOutput(this, 'TargetGlueTableName', { value: targetGlueTable.tableName, }); new CfnOutput(this, 'GlueJobName', { value: glueJob.jobName, }); } }
- AWS CDK のコードは TypeScript で記述しています。
- AWS Glue の L2 Construct Class は現在は
@aws-cdk/aws-glue-alpha
でのみサポートされているので使用しています。 - 読み取り元および書き込み先の Glue テーブルの Glue ジョブへの権限付与は
grantRead
およびgrantWrite
メソッドが便利です。
動作確認
読み取り元テーブルに Athena でデータを作成します。
INSERT INTO cdksamplestackgluedatabase6a27012d.cdksamplestacksourcegluetable88243330 (year, month, day, id, airtemperature) VALUES ('2023', '10', '22', 'd001', 10.05), ('2023', '10', '23', 'd001', 5), ('2023', '10', '23', 'd002', 1.1)
データが作成されました。
SELECT * FROM "cdksamplestackgluedatabase6a27012d"."cdksamplestacksourcegluetable88243330"
id | airtemperature | year | month | day |
---|---|---|---|---|
d001 | 10.05 | 2023 | 10 | 22 |
d001 | 5.0 | 2023 | 10 | 23 |
d002 | 1.1 | 2023 | 10 | 23 |
Glue ジョブを実行します。
JobRunId=$( aws glue start-job-run --job-name ${GLUE_JOB_NAME} \ --query JobRunId \ --output text )
2,3 分待つと、ジョブ実行が成功しました。
$ aws glue get-job-run --job-name ${GLUE_JOB_NAME} --run-id ${JobRunId} --query 'JobRun.JobRunState' "SUCCEEDED"
書き込み先テーブルを Athena でクエリすると、Glue ジョブによりデータが書き込まれていることが確認できました。
SELECT * FROM "cdksamplestackgluedatabase6a27012d"."cdksamplestacktargetgluetable88a583f6"
id | airtemperature | timestamp | year | month | day |
---|---|---|---|---|---|
d001 | 5.0 | 2023-10-21 17:53:41.528 | 2023 | 10 | 23 |
d001 | 10.05 | 2023-10-21 17:53:41.528 | 2023 | 10 | 22 |
d002 | 1.1 | 2023-10-21 17:53:41.528 | 2023 | 10 | 23 |
おわりに
AWS Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する方法を確認してみました。
よくあるユースケースだと思うので参考になれば幸いです。
参考
以上